OpenClaw 自动化工作流设计模式
自动化工作流是 OpenClaw 实现高效任务处理的核心。本文详解 cron 定时任务、heartbeat 心跳机制、工作流编排等设计模式,帮助你构建可靠的自动化系统。
概述
OpenClaw 提供多种自动化机制:
- ⏰ Cron 定时任务 - 精确时间调度的任务
- 💓 Heartbeat 心跳 - 周期性检查与批处理
- 🔄 工作流编排 - 多步骤任务协调
- 🎯 事件驱动 - 响应外部事件触发
- 📊 条件执行 - 基于状态的决策
一、Cron 定时任务
1.1 Cron 基础
OpenClaw 的 cron 系统支持三种调度类型:
javascript
// 1. 一次性任务(at)
{
kind: "at",
at: "2026-03-20T08:00:00+08:00" // ISO-8601 格式
}
// 2. 周期性任务(every)
{
kind: "every",
everyMs: 3600000, // 1 小时
anchorMs: 1710900000000 // 可选:起始时间
}
// 3. Cron 表达式(cron)
{
kind: "cron",
expr: "0 8 * * *", // 每天 8:00
tz: "Asia/Shanghai" // 时区
}1.2 创建定时任务
javascript
// 创建每日早报任务
await cron({
action: 'add',
job: {
name: '每日早报',
schedule: {
kind: 'cron',
expr: '0 8 * * *', // 每天 8:00
tz: 'Asia/Shanghai'
},
payload: {
kind: 'agentTurn',
message: '生成每日早报:百度热搜 + 驻马店天气',
timeoutSeconds: 300
},
sessionTarget: 'isolated', // 隔离会话
enabled: true
}
})
// 创建每小时检查任务
await cron({
action: 'add',
job: {
name: '每小时检查',
schedule: {
kind: 'every',
everyMs: 3600000 // 1 小时
},
payload: {
kind: 'systemEvent',
text: 'HEARTBEAT: 检查邮箱、日历、通知'
},
sessionTarget: 'main' // 主会话
}
})
// 创建一次性提醒
const reminderTime = new Date()
reminderTime.setHours(reminderTime.getHours() + 1)
await cron({
action: 'add',
job: {
name: '一小时后提醒',
schedule: {
kind: 'at',
at: reminderTime.toISOString()
},
payload: {
kind: 'agentTurn',
message: '提醒:休息眼睛,起来活动一下',
timeoutSeconds: 60
},
sessionTarget: 'isolated',
delivery: {
mode: 'announce' // 通知用户
}
}
})1.3 管理定时任务
javascript
// 列出所有任务
async function listCronJobs() {
return await cron({
action: 'list',
includeDisabled: true
})
}
// 获取任务状态
async function getCronStatus() {
return await cron({ action: 'status' })
}
// 立即触发任务
async function runCronJob(jobId) {
return await cron({
action: 'run',
jobId,
runMode: 'force' // force | due
})
}
// 更新任务
async function updateCronJob(jobId, patch) {
return await cron({
action: 'update',
jobId,
patch
})
}
// 删除任务
async function removeCronJob(jobId) {
return await cron({
action: 'remove',
jobId
})
}
// 获取运行历史
async function getCronHistory(jobId) {
return await cron({
action: 'runs',
jobId
})
}1.4 实战案例 1:智能早报系统
javascript
class MorningBriefing {
constructor() {
this.enabled = true
}
// 注册定时任务
async register() {
await cron({
action: 'add',
job: {
name: '每日早报',
schedule: {
kind: 'cron',
expr: '0 8 * * *',
tz: 'Asia/Shanghai'
},
payload: {
kind: 'agentTurn',
message: '生成每日早报',
timeoutSeconds: 300
},
sessionTarget: 'isolated',
delivery: {
mode: 'announce'
}
}
})
console.log('✅ 每日早报任务已注册')
}
// 生成早报
async generate() {
console.log('📰 生成每日早报...')
const sections = []
// 1. 日期信息
const today = new Date()
sections.push(`# 每日早报\n\n`)
sections.push(`📅 ${today.toLocaleDateString('zh-CN', {
year: 'numeric',
month: 'long',
day: 'numeric',
weekday: 'long'
})}\n\n`)
// 2. 天气
const weather = await this.getWeather('驻马店')
sections.push(`## 🌤️ 天气\n\n${weather}\n\n`)
// 3. 百度热搜
const hotSearch = await this.getHotSearch()
sections.push(`## 🔥 百度热搜\n\n${hotSearch}\n\n`)
// 4. 日历事件
const events = await this.getCalendarEvents()
if (events.length > 0) {
sections.push(`## 📅 今日日程\n\n${events}\n\n`)
}
// 5. 待办提醒
const todos = await this.getPendingTodos()
if (todos.length > 0) {
sections.push(`## ✅ 今日待办\n\n${todos}\n\n`)
}
// 6. 励志语录
const quote = await this.getDailyQuote()
sections.push(`## 💭 每日一句\n\n${quote}\n\n`)
// 合并并发送
const briefing = sections.join('')
await message({
action: 'send',
target: '老大',
message: briefing
})
console.log('✅ 早报已发送')
return briefing
}
async getWeather(location) {
try {
const result = await exec({
command: `curl -s "wttr.in/${encodeURIComponent(location)}?format=3"`
})
return result.stdout.trim() || '天气数据暂不可用'
} catch (e) {
return '天气数据获取失败'
}
}
async getHotSearch() {
try {
// 使用 web_search 或 web_fetch 获取热搜
const search = await web_search({
query: '百度热搜榜',
count: 5
})
return search.results?.slice(0, 5).map((r, i) =>
`${i + 1}. [${r.title}](${r.url})`
).join('\n') || '热搜数据暂不可用'
} catch (e) {
return '热搜数据获取失败'
}
}
async getCalendarEvents() {
// 这里可以集成日历 API
// 简化处理
return '暂无日程安排'
}
async getPendingTodos() {
// 从 MEMORY.md 或任务系统获取
try {
const memory = await read({
path: '/home/pao/.openclaw/workspace/MEMORY.md'
})
const todos = memory.match(/\[ \] .*/g) || []
if (todos.length === 0) {
return '暂无待办事项'
}
return todos.slice(0, 5).map(t => t.replace('[ ] ', '- ')).join('\n')
} catch (e) {
return '待办数据获取失败'
}
}
async getDailyQuote() {
const quotes = [
'千里之行,始于足下。',
'不积跬步,无以至千里。',
'学而不思则罔,思而不学则殆。',
'知行合一。',
'每天进步一点点。'
]
const today = new Date()
const index = today.getDate() % quotes.length
return quotes[index]
}
}
// 使用
const briefing = new MorningBriefing()
await briefing.register()
// 手动触发
await briefing.generate()1.5 实战案例 2:晚报总结系统
javascript
class EveningSummary {
constructor() {
this.workLog = []
}
async register() {
await cron({
action: 'add',
job: {
name: '每日晚报',
schedule: {
kind: 'cron',
expr: '0 19 * * *', // 每天 19:00
tz: 'Asia/Shanghai'
},
payload: {
kind: 'agentTurn',
message: '生成每日工作总结',
timeoutSeconds: 300
},
sessionTarget: 'isolated',
delivery: {
mode: 'announce'
}
}
})
console.log('✅ 每日晚报任务已注册')
}
async generate() {
console.log('📝 生成每日工作总结...')
// 1. 读取今日日志
const today = new Date().toISOString().split('T')[0]
let dailyLog = ''
try {
dailyLog = await read({
path: `/home/pao/.openclaw/workspace/memory/${today}.md`
})
} catch (e) {
dailyLog = '今日无日志记录'
}
// 2. 使用 AI 总结
const summary = await sessions_spawn({
task: `基于以下今日日志生成工作总结:
${dailyLog}
包括:
1. 完成的主要工作(3-5 项)
2. 遇到的问题和解决
3. 学到的经验
4. 明日计划建议
格式简洁,使用 Markdown。`,
mode: 'run',
timeoutSeconds: 180
})
// 3. 发送总结
await message({
action: 'send',
target: '老大',
message: `# 每日工作总结\n\n日期:${today}\n\n${summary.output}`
})
console.log('✅ 晚报已发送')
return summary.output
}
}
// 使用
const summary = new EveningSummary()
await summary.register()二、Heartbeat 心跳机制
2.1 Heartbeat 基础
Heartbeat 是 OpenClaw 的周期性检查机制,用于:
- 批量检查多个数据源
- 保持会话活跃
- 执行周期性维护任务
2.2 配置 Heartbeat
yaml
# ~/.openclaw/workspace/HEARTBEAT.md
# 心跳检查清单
## 检查项目
- [ ] 邮箱 - 检查未读邮件
- [ ] 日历 - 查看 24 小时内事件
- [ ] 通知 - 检查重要提醒
- [ ] 天气 - 如有外出计划
## 检查频率
- 工作日:每 30 分钟
- 周末:每 2 小时
- 夜间(23:00-08:00):静默
## 注意事项
- 不要重复检查相同内容
- 记录检查状态到 memory/heartbeat-state.json
- 发现重要事项及时通知2.3 Heartbeat 实现
javascript
class HeartbeatManager {
constructor() {
this.stateFile = '/home/pao/.openclaw/workspace/memory/heartbeat-state.json'
this.state = this.loadState()
}
loadState() {
try {
const content = await read({ path: this.stateFile })
return JSON.parse(content)
} catch (e) {
return {
lastChecks: {
email: null,
calendar: null,
notifications: null,
weather: null
},
checkCount: 0
}
}
}
saveState() {
write({
path: this.stateFile,
content: JSON.stringify(this.state, null, 2)
})
}
// 处理心跳
async handleHeartbeat() {
console.log('💓 心跳检查...')
const now = Date.now()
const hour = new Date().getHours()
// 夜间静默
if (hour >= 23 || hour < 8) {
console.log('夜间静默,跳过检查')
return 'HEARTBEAT_OK'
}
const results = []
// 检查邮箱(每 30 分钟)
if (this.shouldCheck('email', 30 * 60 * 1000)) {
const emailResult = await this.checkEmail()
results.push({ type: 'email', ...emailResult })
this.state.lastChecks.email = now
}
// 检查日历(每 60 分钟)
if (this.shouldCheck('calendar', 60 * 60 * 1000)) {
const calendarResult = await this.checkCalendar()
results.push({ type: 'calendar', ...calendarResult })
this.state.lastChecks.calendar = now
}
// 检查通知(每 30 分钟)
if (this.shouldCheck('notifications', 30 * 60 * 1000)) {
const notificationResult = await this.checkNotifications()
results.push({ type: 'notifications', ...notificationResult })
this.state.lastChecks.notifications = now
}
// 检查天气(每 2 小时)
if (this.shouldCheck('weather', 2 * 60 * 60 * 1000)) {
const weatherResult = await this.checkWeather()
results.push({ type: 'weather', ...weatherResult })
this.state.lastChecks.weather = now
}
this.state.checkCount++
this.saveState()
// 汇总结果
if (results.some(r => r.hasNew)) {
await this.sendSummary(results)
}
return results.length > 0 ? `完成 ${results.length} 项检查` : 'HEARTBEAT_OK'
}
shouldCheck(type, interval) {
const lastCheck = this.state.lastChecks[type]
if (!lastCheck) return true
return Date.now() - lastCheck >= interval
}
async checkEmail() {
// 实现邮箱检查逻辑
return { hasNew: false, count: 0 }
}
async checkCalendar() {
// 实现日历检查逻辑
const upcomingEvents = [] // 获取 24 小时内事件
return {
hasNew: upcomingEvents.length > 0,
events: upcomingEvents
}
}
async checkNotifications() {
// 实现通知检查逻辑
return { hasNew: false, notifications: [] }
}
async checkWeather() {
try {
const result = await exec({
command: 'curl -s "wttr.in/驻马店?format=3"'
})
return {
hasNew: true,
weather: result.stdout.trim()
}
} catch (e) {
return { hasNew: false }
}
}
async sendSummary(results) {
let summary = '💓 心跳检查摘要\n\n'
for (const result of results) {
if (result.hasNew) {
summary += `## ${this.getTypeIcon(result.type)} ${result.type}\n`
if (result.events) {
summary += result.events.map(e => `- ${e.title} (${e.time})\n`).join('')
}
if (result.weather) {
summary += `${result.weather}\n`
}
summary += '\n'
}
}
if (summary.trim().split('\n').length > 2) {
await message({
action: 'send',
target: '老大',
message: summary
})
}
}
getTypeIcon(type) {
const icons = {
email: '📧',
calendar: '📅',
notifications: '🔔',
weather: '🌤️'
}
return icons[type] || '📌'
}
}
// 使用
const heartbeat = new HeartbeatManager()
// 在 HEARTBEAT.md 中配置的检查
// 收到心跳消息时调用
async function onHeartbeat(message) {
return await heartbeat.handleHeartbeat()
}2.4 实战案例 3:Heartbeat 与 Cron 对比
javascript
// Heartbeat vs Cron 选择指南
/*
使用 Heartbeat 当:
✅ 多个检查可以批量处理
✅ 需要会话上下文
✅ 时间可以略有漂移
✅ 想减少 API 调用
使用 Cron 当:
✅ 精确时间很重要
✅ 任务需要隔离
✅ 不同模型/配置
✅ 一次性提醒
*/
// 示例:批处理 vs 独立任务
// ❌ 不推荐:多个独立 cron 任务
await cron({ action: 'add', job: { name: '检查邮箱', schedule: { kind: 'every', everyMs: 1800000 } } })
await cron({ action: 'add', job: { name: '检查日历', schedule: { kind: 'every', everyMs: 1800000 } } })
await cron({ action: 'add', job: { name: '检查天气', schedule: { kind: 'every', everyMs: 1800000 } } })
// ✅ 推荐:使用 Heartbeat 批量处理
// 在 HEARTBEAT.md 中配置,一次检查所有项目三、工作流编排
3.1 顺序工作流
javascript
class SequentialWorkflow {
constructor(name) {
this.name = name
this.steps = []
}
addStep(name, fn, options = {}) {
this.steps.push({
name,
fn,
timeout: options.timeout || 300,
retries: options.retries || 0
})
return this
}
async execute(context = {}) {
console.log(`🚀 开始工作流:${this.name}`)
const results = {}
const startTime = Date.now()
for (let i = 0; i < this.steps.length; i++) {
const step = this.steps[i]
console.log(`步骤 ${i + 1}/${this.steps.length}: ${step.name}`)
try {
const result = await this.executeWithRetry(step, context)
results[step.name] = result
// 更新上下文
Object.assign(context, result.context || {})
} catch (error) {
console.error(`步骤失败:${step.name}`, error)
if (step.onFailure === 'continue') {
results[step.name] = { error: error.message }
} else {
throw new Error(`工作流在步骤 "${step.name}" 失败:${error.message}`)
}
}
}
const duration = Date.now() - startTime
console.log(`✅ 工作流完成,耗时:${duration}ms`)
return { results, context, duration }
}
async executeWithRetry(step, context) {
let lastError
for (let attempt = 0; attempt <= step.retries; attempt++) {
try {
return await Promise.race([
step.fn(context),
this.timeout(step.timeout * 1000)
])
} catch (error) {
lastError = error
if (attempt < step.retries) {
console.log(`重试 ${attempt + 1}/${step.retries}`)
await this.sleep(2000 * (attempt + 1))
}
}
}
throw lastError
}
timeout(ms) {
return new Promise((_, reject) => {
setTimeout(() => reject(new Error('超时')), ms)
})
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
}
// 使用:文章发布工作流
const publishWorkflow = new SequentialWorkflow('文章发布')
.addStep('验证草稿', async (ctx) => {
const exists = await this.verifyDraft(ctx.filePath)
return { verified: exists, context: { ...ctx } }
})
.addStep('添加 frontmatter', async (ctx) => {
await this.addFrontmatter(ctx.filePath, ctx.title, ctx.date)
return { frontmatterAdded: true }
})
.addStep('更新侧边栏', async (ctx) => {
await this.updateSidebar(ctx.filePath, ctx.title)
return { sidebarUpdated: true }
})
.addStep('构建网站', async (ctx) => {
const result = await exec({ command: 'npm run build', timeout: 120 })
return { buildSuccess: result.exitCode === 0 }
})
.addStep('部署文件', async (ctx) => {
await exec({ command: 'rsync -av dist/ user@server:/var/www/' })
return { deployed: true }
})
.addStep('验证发布', async (ctx) => {
const response = await fetch(ctx.url)
return { verified: response.ok }
})
.addStep('提交 Git', async (ctx) => {
await exec({ command: 'git add -A && git commit -m "publish: ..." && git push' })
return { committed: true }
})
// 执行
const result = await publishWorkflow.execute({
filePath: '/docs/guide/article.md',
title: '新文章',
date: '2026-03-20',
url: 'https://example.com/guide/article.html'
})3.2 并行工作流
javascript
class ParallelWorkflow {
constructor(name) {
this.name = name
this.tasks = []
this.concurrency = 5
}
addTask(name, fn) {
this.tasks.push({ name, fn })
return this
}
async execute(context = {}) {
console.log(`🚀 开始并行工作流:${this.name}`)
console.log(`任务数:${this.tasks.length}, 并发数:${this.concurrency}`)
const results = []
const startTime = Date.now()
// 分批执行
for (let i = 0; i < this.tasks.length; i += this.concurrency) {
const batch = this.tasks.slice(i, i + this.concurrency)
console.log(`执行批次 ${Math.floor(i / this.concurrency) + 1}`)
const batchResults = await Promise.allSettled(
batch.map(task => task.fn(context).then(r => ({ name: task.name, result: r })))
)
results.push(...batchResults)
}
const duration = Date.now() - startTime
const success = results.filter(r => r.status === 'fulfilled').length
const failed = results.filter(r => r.status === 'rejected').length
console.log(`✅ 工作流完成:${success} 成功,${failed} 失败,耗时:${duration}ms`)
return {
results: results.map(r => r.status === 'fulfilled' ? r.value : { error: r.reason.message }),
success,
failed,
duration
}
}
}
// 使用:批量文章处理
const batchWorkflow = new ParallelWorkflow('批量文章处理')
for (const article of articles) {
batchWorkflow.addTask(article.title, async () => {
// 处理单篇文章
await processArticle(article)
return { processed: true, title: article.title }
})
}
const result = await batchWorkflow.execute()3.3 条件工作流
javascript
class ConditionalWorkflow {
constructor(name) {
this.name = name
this.branches = []
}
addBranch(condition, workflow) {
this.branches.push({ condition, workflow })
return this
}
async execute(context) {
console.log(`🚀 开始条件工作流:${this.name}`)
for (const branch of this.branches) {
const shouldRun = await branch.condition(context)
if (shouldRun) {
console.log(`执行分支:${branch.workflow.name}`)
return await branch.workflow.execute(context)
}
}
console.log('没有匹配的条件分支')
return { executed: false }
}
}
// 使用:根据文件类型选择处理流程
const fileWorkflow = new ConditionalWorkflow('文件处理')
.addBranch(
async (ctx) => ctx.fileType === 'image',
new SequentialWorkflow('图片处理')
.addStep('压缩', compressImage)
.addStep('添加水印', addWatermark)
.addStep('上传 CDN', uploadToCDN)
)
.addBranch(
async (ctx) => ctx.fileType === 'document',
new SequentialWorkflow('文档处理')
.addStep('转换格式', convertFormat)
.addStep('提取文本', extractText)
.addStep('建立索引', buildIndex)
)
.addBranch(
async (ctx) => ctx.fileType === 'video',
new SequentialWorkflow('视频处理')
.addStep('转码', transcode)
.addStep('生成缩略图', generateThumbnail)
.addStep('上传存储', uploadToStorage)
)
// 执行
const result = await fileWorkflow.execute({
fileType: 'image',
filePath: '/path/to/image.jpg'
})3.4 实战案例 4:完整的内容发布系统
javascript
class ContentPublishingSystem {
constructor() {
this.workflows = {
article: this.createArticleWorkflow(),
video: this.createVideoWorkflow(),
podcast: this.createPodcastWorkflow()
}
}
createArticleWorkflow() {
return new SequentialWorkflow('文章发布')
.addStep('验证内容', async (ctx) => {
const content = await read({ path: ctx.filePath })
if (content.length < 1000) {
throw new Error('文章内容过短')
}
return { wordCount: content.length }
})
.addStep('AI 审校', async (ctx) => {
const content = await read({ path: ctx.filePath })
const review = await sessions_spawn({
task: `审校以下文章:
- 语法错误
- 逻辑问题
- 改进建议
${content}`,
mode: 'run'
})
return { review: review.output }
})
.addStep('添加元数据', async (ctx) => {
await this.addFrontmatter(ctx)
return { frontmatterAdded: true }
})
.addStep('更新导航', async (ctx) => {
await this.updateNavigation(ctx)
return { navigationUpdated: true }
})
.addStep('构建部署', async (ctx) => {
await exec({ command: 'npm run build && npm run deploy', timeout: 300 })
return { deployed: true }
})
.addStep('通知订阅', async (ctx) => {
await this.notifySubscribers(ctx)
return { notified: true }
})
}
createVideoWorkflow() {
return new SequentialWorkflow('视频发布')
.addStep('视频转码', async (ctx) => {
await exec({ command: `ffmpeg -i ${ctx.input} -c:v libx264 ${ctx.output}` })
return { transcoded: true }
})
.addStep('生成缩略图', async (ctx) => {
await exec({ command: `ffmpeg -i ${ctx.input} -ss 00:00:05 -vframes 1 ${ctx.thumbnail}` })
return { thumbnailGenerated: true }
})
.addStep('上传视频', async (ctx) => {
await this.uploadVideo(ctx.output)
return { uploaded: true }
})
.addStep('创建页面', async (ctx) => {
await this.createVideoPage(ctx)
return { pageCreated: true }
})
}
async publish(type, context) {
const workflow = this.workflows[type]
if (!workflow) {
throw new Error(`未知内容类型:${type}`)
}
console.log(`📤 开始发布 ${type} 内容`)
try {
const result = await workflow.execute(context)
// 记录发布历史
await this.recordPublish(type, context, result)
return { success: true, ...result }
} catch (error) {
console.error('发布失败:', error)
// 通知失败
await message({
action: 'send',
target: '老大',
message: `❌ 内容发布失败\n\n类型:${type}\n错误:${error.message}\n时间:${new Date().toLocaleString('zh-CN')}`
})
return { success: false, error: error.message }
}
}
async recordPublish(type, context, result) {
const entry = {
type,
context,
result,
timestamp: Date.now()
}
// 追加到发布历史
const historyPath = '/home/pao/.openclaw/workspace/memory/publish-history.jsonl'
await exec({
command: `echo '${JSON.stringify(entry)}' >> ${historyPath}`
})
}
async notifySubscribers(ctx) {
// 实现订阅通知逻辑
await message({
action: 'send',
target: 'content-subscribers',
message: `📰 新文章发布:${ctx.title}\n\n${ctx.url}`
})
}
}
// 使用
const publisher = new ContentPublishingSystem()
// 发布文章
const result = await publisher.publish('article', {
filePath: '/docs/guide/new-article.md',
title: '新文章标题',
url: 'https://example.com/guide/new-article.html'
})
console.log('发布结果:', result)四、事件驱动自动化
4.1 文件系统监听
javascript
class FileWatcher {
constructor(watchPath, handler) {
this.watchPath = watchPath
this.handler = handler
this.watching = false
}
async start() {
this.watching = true
console.log(`开始监听:${this.watchPath}`)
// 使用 inotifywait 或 fs.watch
const process = await exec({
command: `inotifywait -m -r -e modify,create,delete ${this.watchPath}`,
background: true
})
// 处理事件
process.on('output', (output) => {
const event = this.parseEvent(output)
if (event) {
this.handler(event)
}
})
}
parseEvent(output) {
// 解析 inotifywait 输出
const match = output.match(/(\w+)\s+(\w+)\s+(.+)/)
if (match) {
return {
directory: match[1],
event: match[2],
file: match[3]
}
}
return null
}
}
// 使用:监听文章目录
const watcher = new FileWatcher('/home/pao/projects/ai-knowledge-base/docs/guide', async (event) => {
console.log('文件事件:', event)
if (event.event === 'CREATE' && event.file.endsWith('.md')) {
// 新文章创建,触发处理流程
await processNewArticle(event.file)
}
})
await watcher.start()4.2 消息事件监听
javascript
class MessageEventListener {
constructor() {
this.listeners = new Map()
}
on(pattern, handler) {
this.listeners.set(pattern, handler)
}
async handleMessage(message) {
for (const [pattern, handler] of this.listeners) {
if (this.matchPattern(message.content, pattern)) {
await handler(message)
return
}
}
}
matchPattern(content, pattern) {
// 简单的模式匹配
if (pattern.startsWith('/') && pattern.endsWith('/')) {
const regex = new RegExp(pattern.slice(1, -1))
return regex.test(content)
}
return content.includes(pattern)
}
}
// 使用
const listener = new MessageEventListener()
// 监听发布命令
listener.on('/publish', async (message) => {
const filePath = message.content.split(' ')[1]
await publisher.publish('article', { filePath })
})
// 监听天气查询
listener.on('天气', async (message) => {
const location = message.content.replace('天气', '').trim() || '驻马店'
const weather = await getWeather(location)
await message({ action: 'send', replyTo: message.id, message: weather })
})五、最佳实践
5.1 错误处理
javascript
// 始终设置超时
await sessions_spawn({
task: '...',
timeoutSeconds: 300 // 5 分钟
})
// 实现重试
async function withRetry(fn, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await fn()
} catch (error) {
if (i === maxRetries - 1) throw error
await sleep(2000 * (i + 1))
}
}
}
// 保存状态支持恢复
async function saveState(state) {
await write({
path: '/tmp/workflow-state.json',
content: JSON.stringify(state)
})
}5.2 日志记录
javascript
class WorkflowLogger {
constructor(name) {
this.name = name
this.logs = []
this.startTime = Date.now()
}
log(step, status, details = {}) {
this.logs.push({
step,
status,
timestamp: Date.now(),
details
})
console.log(`[${this.name}] ${step}: ${status}`)
}
generateReport() {
const duration = Date.now() - this.startTime
const failed = this.logs.filter(l => l.status === 'failed')
return {
workflow: this.name,
duration,
totalSteps: this.logs.length,
failedSteps: failed.length,
logs: this.logs
}
}
}5.3 性能优化
javascript
// 限制并发数
const semaphore = { count: 0, max: 5, queue: [] }
async function acquire() {
if (semaphore.count >= semaphore.max) {
await new Promise(resolve => semaphore.queue.push(resolve))
}
semaphore.count++
}
async function release() {
semaphore.count--
if (semaphore.queue.length > 0) {
semaphore.queue.shift()()
}
}
// 使用
await acquire()
try {
await processTask()
} finally {
await release()
}六、总结
核心要点
- Cron 用于精确时间调度
- Heartbeat 用于批量周期检查
- 工作流编排复杂任务
- 事件驱动响应外部变化
- 错误处理和日志至关重要
选择指南
| 需求 | 推荐方案 |
|---|---|
| 每天固定时间 | Cron |
| 周期性检查 | Heartbeat |
| 多步骤任务 | 顺序工作流 |
| 批量处理 | 并行工作流 |
| 条件执行 | 条件工作流 |
| 实时响应 | 事件驱动 |
🟢🐉 开始构建你的自动化工作流吧!